// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/rowset/segment_v2/index_file_reader.h"

#include <memory>
#include <utility>

#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
#include "olap/tablet_schema.h"
#include "util/debug_points.h"

namespace doris::segment_v2 {

Status IndexFileReader::init(int32_t read_buffer_size, const io::IOContext* io_ctx) {
    std::unique_lock<std::shared_mutex> lock(_mutex); // Lock for writing
    if (!_inited) {
        _read_buffer_size = read_buffer_size;
        if (_storage_format >= InvertedIndexStorageFormatPB::V2) {
            RETURN_IF_ERROR(_init_from(read_buffer_size, io_ctx));
        }
        _inited = true;
    }
    return Status::OK();
}

Status IndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext* io_ctx) {
    auto index_file_full_path = InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);

    try {
        CLuceneError err;
        CL_NS(store)::IndexInput* index_input = nullptr;

        // 1. get file size from meta
        int64_t file_size = -1;
        if (_idx_file_info.has_index_size()) {
            file_size = _idx_file_info.index_size();
        }
        file_size = file_size == 0 ? -1 : file_size;

        DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", {
            if (file_size == -1) {
                return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                        "CLuceneError occur file size = -1, file is {}", index_file_full_path);
            }
        })

        DCHECK(_fs != nullptr) << "file system is nullptr, index_file_full_path: "
                               << index_file_full_path;
        // 2. open file
        auto ok = DorisFSDirectory::FSIndexInput::open(
                _fs, index_file_full_path.c_str(), index_input, err, read_buffer_size, file_size);
        if (!ok) {
            if (err.number() == CL_ERR_FileNotFound) {
                return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
                        "inverted index file {} is not found.", index_file_full_path);
            } else if (err.number() == CL_ERR_EmptyIndexSegment) {
                return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>(
                        "inverted index file {} is empty.", index_file_full_path);
            }
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                    "CLuceneError occur when open idx file {}, error msg: {}", index_file_full_path,
                    err.what());
        }
        _stream = std::unique_ptr<CL_NS(store)::IndexInput>(index_input);
        _stream->setIoContext(io_ctx);
        _stream->setIndexFile(true);

        // 3. read file
        int32_t version = _stream->readInt(); // Read version number
        if (version >= InvertedIndexStorageFormatPB::V2) {
            DCHECK(version == _storage_format);
            int32_t numIndices = _stream->readInt(); // Read number of indices

            for (int32_t i = 0; i < numIndices; ++i) {
                int64_t indexId = _stream->readLong();      // Read index ID
                int32_t suffix_length = _stream->readInt(); // Read suffix length
                std::vector<uint8_t> suffix_data(suffix_length);
                _stream->readBytes(suffix_data.data(), suffix_length);
                std::string suffix_str(suffix_data.begin(), suffix_data.end());

                int32_t numFiles = _stream->readInt(); // Read number of files in the index

                auto fileEntries = std::make_unique<EntriesType>();
                fileEntries->reserve(numFiles);

                for (int32_t j = 0; j < numFiles; ++j) {
                    int32_t file_name_length = _stream->readInt();
                    std::string file_name(file_name_length, '\0');
                    _stream->readBytes(reinterpret_cast<uint8_t*>(file_name.data()),
                                       file_name_length);
                    auto entry = std::make_unique<ReaderFileEntry>();
                    entry->file_name = std::move(file_name);
                    entry->offset = _stream->readLong();
                    entry->length = _stream->readLong();
                    fileEntries->emplace(entry->file_name, std::move(entry));
                }

                _indices_entries.emplace(std::make_pair(indexId, std::move(suffix_str)),
                                         std::move(fileEntries));
            }
        } else {
            return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                    "unknown inverted index format {}", version);
        }
    } catch (CLuceneError& err) {
        if (_stream != nullptr) {
            try {
                _stream->close();
            } catch (CLuceneError& err) {
                return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                        "CLuceneError occur when close idx file {}, error msg: {}",
                        index_file_full_path, err.what());
            }
        }
        return Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                "CLuceneError occur when init idx file {}, error msg: {}", index_file_full_path,
                err.what());
    }
    return Status::OK();
}

Result<InvertedIndexDirectoryMap> IndexFileReader::get_all_directories() {
    InvertedIndexDirectoryMap res;
    std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
    for (auto& [index, _] : _indices_entries) {
        auto&& [index_id, index_suffix] = index;
        LOG(INFO) << "index_id:" << index_id << " index_suffix:" << index_suffix;
        auto ret = _open(index_id, index_suffix);
        if (!ret.has_value()) {
            return ResultError(ret.error());
        }
        res.emplace(std::make_pair(index_id, index_suffix), std::move(ret.value()));
    }
    return res;
}

Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::_open(
        int64_t index_id, const std::string& index_suffix, const io::IOContext* io_ctx) const {
    std::unique_ptr<DorisCompoundReader, DirectoryDeleter> compound_reader;

    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
        auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
                _index_path_prefix, index_id, index_suffix);
        try {
            CLuceneError err;
            CL_NS(store)::IndexInput* index_input = nullptr;

            // 1. get file size from meta
            int64_t file_size = -1;
            if (_idx_file_info.index_info_size() > 0) {
                for (const auto& idx_info : _idx_file_info.index_info()) {
                    if (index_id == idx_info.index_id() &&
                        index_suffix == idx_info.index_suffix()) {
                        file_size = idx_info.index_file_size();
                        break;
                    }
                }
            }
            file_size = file_size == 0 ? -1 : file_size;
            DBUG_EXECUTE_IF("file_size_not_in_rowset_meta ", {
                if (file_size == -1) {
                    return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                            "CLuceneError occur file size = -1, file is {}", index_file_path));
                }
            })
            DCHECK(_fs != nullptr)
                    << "file system is nullptr, index_file_path: " << index_file_path;
            // 2. open file
            auto ok = DorisFSDirectory::FSIndexInput::open(
                    _fs, index_file_path.c_str(), index_input, err, _read_buffer_size, file_size);
            if (!ok) {
                // now index_input = nullptr
                if (err.number() == CL_ERR_FileNotFound) {
                    return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
                            "inverted index file {} is not found.", index_file_path));
                }
                return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                        "CLuceneError occur when open idx file {}, error msg: {}", index_file_path,
                        err.what()));
            }

            // 3. read file in DorisCompoundReader
            compound_reader.reset(new DorisCompoundReader(index_input, _read_buffer_size));
        } catch (CLuceneError& err) {
            return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_CLUCENE_ERROR>(
                    "CLuceneError occur when open idx file {}, error msg: {}", index_file_path,
                    err.what()));
        }
    } else {
        std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
        if (_stream == nullptr) {
            return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
                    "CLuceneError occur when open idx file {}, stream is nullptr",
                    InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix)));
        }

        // Check if the specified index exists
        auto index_it = _indices_entries.find(std::make_pair(index_id, index_suffix));
        if (index_it == _indices_entries.end()) {
            std::ostringstream errMsg;
            errMsg << "No index with id " << index_id << " found";
            return ResultError(Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
                    "CLuceneError occur when open idx file {}, error msg: {}",
                    InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix),
                    errMsg.str()));
        }
        // Need to clone resource here, because index searcher cache need it.
        compound_reader.reset(new DorisCompoundReader(_stream->clone(), *index_it->second,
                                                      _read_buffer_size, io_ctx));
    }
    return compound_reader;
}

Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::open(
        const TabletIndex* index_meta, const io::IOContext* io_ctx) const {
    auto index_id = index_meta->index_id();
    auto index_suffix = index_meta->get_index_suffix();
    return _open(index_id, index_suffix, io_ctx);
}

std::string IndexFileReader::get_index_file_cache_key(const TabletIndex* index_meta) const {
    return InvertedIndexDescriptor::get_index_file_cache_key(
            _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix());
}

std::string IndexFileReader::get_index_file_path(const TabletIndex* index_meta) const {
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
        return InvertedIndexDescriptor::get_index_file_path_v1(
                _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix());
    }
    return InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix);
}

Status IndexFileReader::index_file_exist(const TabletIndex* index_meta, bool* res) const {
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
        auto index_file_path = InvertedIndexDescriptor::get_index_file_path_v1(
                _index_path_prefix, index_meta->index_id(), index_meta->get_index_suffix());
        return _fs->exists(index_file_path, res);
    } else {
        std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
        if (_stream == nullptr) {
            *res = false;
            return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
                    "idx file {} is not opened",
                    InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix));
        }
        // Check if the specified index exists
        auto index_it = _indices_entries.find(
                std::make_pair(index_meta->index_id(), index_meta->get_index_suffix()));
        if (index_it == _indices_entries.end()) {
            *res = false;
        } else {
            *res = true;
        }
    }
    return Status::OK();
}

Status IndexFileReader::has_null(const TabletIndex* index_meta, bool* res) const {
    if (_storage_format == InvertedIndexStorageFormatPB::V1) {
        *res = true;
        return Status::OK();
    }
    std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
    if (_stream == nullptr) {
        return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
                "idx file {} is not opened",
                InvertedIndexDescriptor::get_index_file_path_v2(_index_path_prefix));
    }
    // Check if the specified index exists
    auto index_it = _indices_entries.find(
            std::make_pair(index_meta->index_id(), index_meta->get_index_suffix()));
    if (index_it == _indices_entries.end()) {
        *res = false;
    } else {
        const auto& entries = index_it->second;
        auto entry_it =
                entries->find(InvertedIndexDescriptor::get_temporary_null_bitmap_file_name());
        if (entry_it == entries->end()) {
            *res = false;
            return Status::OK();
        }
        const auto& e = entry_it->second;
        // roaring bitmap cookie header size is 5
        if (e->length <= 5) {
            *res = false;
        } else {
            *res = true;
        }
    }
    return Status::OK();
}

void IndexFileReader::debug_file_entries() {
    std::shared_lock<std::shared_mutex> lock(_mutex); // Lock for reading
    for (const auto& index : _indices_entries) {
        LOG(INFO) << "index_id:" << index.first.first;
        const auto& index_entries = index.second;
        for (const auto& entry : *index_entries) {
            const auto& file_entry = entry.second;
            LOG(INFO) << "file entry name:" << file_entry->file_name
                      << " length:" << file_entry->length << " offset:" << file_entry->offset;
        }
    }
}

} // namespace doris::segment_v2
